# coding: utf-8
from __future__ import print_function, unicode_literals

import argparse
import errno
import logging
import os
import re
import shlex
import signal
import socket
import string
import sys
import threading
import time
from datetime import datetime

# from inspect import currentframe
# print(currentframe().f_lineno)


if True:  # pylint: disable=using-constant-test
    from types import FrameType

    import typing
    from typing import Any, Optional, Union

from .__init__ import ANYWIN, EXE, MACOS, PY2, TYPE_CHECKING, E, EnvParams, unicode
from .authsrv import BAD_CFG, AuthSrv
from .bos import bos
from .cert import ensure_cert
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE, HAVE_MUTAGEN
from .pwhash import HAVE_ARGON2
from .tcpsrv import TcpSrv
from .th_srv import (
    HAVE_AVIF,
    HAVE_FFMPEG,
    HAVE_FFPROBE,
    HAVE_HEIF,
    HAVE_PIL,
    HAVE_VIPS,
    HAVE_WEBP,
    ThumbSrv,
)
from .up2k import Up2k
from .util import (
    DEF_EXP,
    DEF_MTE,
    DEF_MTH,
    FFMPEG_URL,
    HAVE_PSUTIL,
    HAVE_SQLITE3,
    HAVE_ZMQ,
    RE_ANSI,
    URL_BUG,
    UTC,
    VERSIONS,
    Daemon,
    Garda,
    HLog,
    HMaccas,
    ODict,
    alltrace,
    build_netmap,
    expat_ver,
    gzip,
    load_ipu,
    lock_file,
    min_ex,
    mp,
    odfusion,
    pybin,
    start_log_thrs,
    start_stackmon,
    ub64enc,
)

if HAVE_SQLITE3:
    import sqlite3

if TYPE_CHECKING:
    try:
        from .mdns import MDNS
        from .ssdp import SSDPd
    except:
        pass

if PY2:
    range = xrange  # type: ignore


VER_IDP_DB = 1
VER_SESSION_DB = 1
VER_SHARES_DB = 2


class SvcHub(object):
    """
    Hosts all services which cannot be parallelized due to reliance on monolithic resources.
    Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
        hub.broker.<say|ask>(destination, args_list).

    Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
    Nothing is returned synchronously; if you want any value returned from the call,
    put() can return a queue (if want_reply=True) which has a blocking get() with the response.
    """

    def __init__(
        self,
        args: argparse.Namespace,
        dargs: argparse.Namespace,
        argv: list[str],
        printed: str,
    ) -> None:
        self.args = args
        self.dargs = dargs
        self.argv = argv
        self.E: EnvParams = args.E
        self.no_ansi = args.no_ansi
        self.tz = UTC if args.log_utc else None
        self.logf: Optional[typing.TextIO] = None
        self.logf_base_fn = ""
        self.is_dut = False  # running in unittest; always False
        self.stop_req = False
        self.stopping = False
        self.stopped = False
        self.reload_req = False
        self.reload_mutex = threading.Lock()
        self.stop_cond = threading.Condition()
        self.nsigs = 3
        self.retcode = 0
        self.httpsrv_up = 0

        self.log_mutex = threading.Lock()
        self.cday = 0
        self.cmon = 0
        self.tstack = 0.0

        self.iphash = HMaccas(os.path.join(self.E.cfg, "iphash"), 8)

        if args.sss or args.s >= 3:
            args.ss = True
            args.no_dav = True
            args.no_logues = True
            args.no_readme = True
            args.lo = args.lo or "cpp-%Y-%m%d-%H%M%S.txt.xz"
            args.ls = args.ls or "**,*,ln,p,r"

        if args.ss or args.s >= 2:
            args.s = True
            args.unpost = 0
            args.no_del = True
            args.no_mv = True
            args.hardlink = True
            args.vague_403 = True
            args.nih = True

        if args.s:
            args.dotpart = True
            args.no_thumb = True
            args.no_mtag_ff = True
            args.no_robots = True
            args.force_js = True

        if not self._process_config():
            raise Exception(BAD_CFG)

        # for non-http clients (ftp, tftp)
        self.bans: dict[str, int] = {}
        self.gpwd = Garda(self.args.ban_pw)
        self.gpwc = Garda(self.args.ban_pwc)
        self.g404 = Garda(self.args.ban_404)
        self.g403 = Garda(self.args.ban_403)
        self.g422 = Garda(self.args.ban_422, False)
        self.gmal = Garda(self.args.ban_422)
        self.gurl = Garda(self.args.ban_url)

        self.log_div = 10 ** (6 - args.log_tdec)
        self.log_efmt = "%02d:%02d:%02d.%0{}d".format(args.log_tdec)
        self.log_dfmt = "%04d-%04d-%06d.%0{}d".format(args.log_tdec)
        self.log = self._log_disabled if args.q else self._log_enabled
        if args.lo:
            self._setup_logfile(printed)

        lg = logging.getLogger()
        lh = HLog(self.log)
        lg.handlers = [lh]
        lg.setLevel(logging.DEBUG)

        self._check_env()

        if args.stackmon:
            start_stackmon(args.stackmon, 0)

        if args.log_thrs:
            start_log_thrs(self.log, args.log_thrs, 0)

        if not args.use_fpool and args.j != 1:
            args.no_fpool = True
            t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems, and make some antivirus-softwares "
            c = 0
            if ANYWIN:
                t += "(especially Microsoft Defender) stress your CPU and HDD severely during big uploads"
                c = 3
            else:
                t += "consume more resources (CPU/HDD) than normal"
            self.log("root", t.format(args.j), c)

        if not args.no_fpool and args.j != 1:
            t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
            self.log("root", t.format(args.j), c=3)
            args.no_fpool = True

        for name, arg in (
            ("iobuf", "iobuf"),
            ("s-rd-sz", "s_rd_sz"),
            ("s-wr-sz", "s_wr_sz"),
        ):
            zi = getattr(args, arg)
            if zi < 32768:
                t = "WARNING: expect very poor performance because you specified a very low value (%d) for --%s"
                self.log("root", t % (zi, name), 3)
                zi = 2
            zi2 = 2 ** (zi - 1).bit_length()
            if zi != zi2:
                zi3 = 2 ** ((zi - 1).bit_length() - 1)
                t = "WARNING: expect poor performance because --%s is not a power-of-two; consider using %d or %d instead of %d"
                self.log("root", t % (name, zi2, zi3, zi), 3)

        if args.s_rd_sz > args.iobuf:
            t = "WARNING: --s-rd-sz (%d) is larger than --iobuf (%d); this may lead to reduced performance"
            self.log("root", t % (args.s_rd_sz, args.iobuf), 3)

        zs = ""
        if args.th_ram_max < 0.22:
            zs = "generate thumbnails"
        elif args.th_ram_max < 1:
            zs = "generate audio waveforms or spectrograms"
        if zs:
            t = "WARNING: --th-ram-max is very small (%.2f GiB); will not be able to %s"
            self.log("root", t % (args.th_ram_max, zs), 3)

        if args.chpw and args.idp_h_usr:
            t = "ERROR: user-changeable passwords is incompatible with IdP/identity-providers; you must disable either --chpw or --idp-h-usr"
            self.log("root", t, 1)
            raise Exception(t)

        noch = set()
        for zs in args.chpw_no or []:
            zsl = [x.strip() for x in zs.split(",")]
            noch.update([x for x in zsl if x])
        args.chpw_no = noch

        if args.ipu:
            iu, nm = load_ipu(self.log, args.ipu, True)
            setattr(args, "ipu_iu", iu)
            setattr(args, "ipu_nm", nm)

        for zs in "ah_salt fk_salt dk_salt".split():
            if getattr(args, "show_%s" % (zs,)):
                self.log("root", "effective %s is %s" % (zs, getattr(args, zs)))

        if args.ah_cli or args.ah_gen:
            args.idp_store = 0
            args.no_ses = True
            args.shr = ""

        if args.idp_store and args.idp_h_usr:
            self.setup_db("idp")

        if not self.args.no_ses:
            self.setup_db("ses")

        args.shr1 = ""
        if args.shr:
            self.setup_share_db()

        bri = "zy"[args.theme % 2 :][:1]
        ch = "abcdefghijklmnopqrstuvwx"[int(args.theme / 2)]
        args.theme = "{0}{1} {0} {1}".format(ch, bri)

        if args.nih:
            args.vname = ""
            args.doctitle = args.doctitle.replace(" @ --name", "")
        else:
            args.vname = args.name
        args.doctitle = args.doctitle.replace("--name", args.vname)
        args.bname = args.bname.replace("--name", args.vname) or args.vname

        if args.log_fk:
            args.log_fk = re.compile(args.log_fk)

        # initiate all services to manage
        self.asrv = AuthSrv(self.args, self.log, dargs=self.dargs)

        if args.cgen:
            self.asrv.cgen()

        if args.exit == "cfg":
            sys.exit(0)

        if args.ls:
            self.asrv.dbg_ls()

        if not ANYWIN:
            self._setlimits()

        self.log("root", "max clients: {}".format(self.args.nc))

        self.tcpsrv = TcpSrv(self)

        if not self.tcpsrv.srv and self.args.ign_ebind_all:
            self.args.no_fastboot = True

        self.up2k = Up2k(self)

        self._feature_test()

        decs = {k: 1 for k in self.args.th_dec.split(",")}
        if not HAVE_VIPS:
            decs.pop("vips", None)
        if not HAVE_PIL:
            decs.pop("pil", None)
        if not HAVE_FFMPEG or not HAVE_FFPROBE:
            decs.pop("ff", None)

        # compressed formats; "s3z=s3m.zip, s3gz=s3m.gz, ..."
        zlss = [x.strip().lower().split("=", 1) for x in args.au_unpk.split(",")]
        args.au_unpk = {x[0]: x[1] for x in zlss}

        self.args.th_dec = list(decs.keys())
        self.thumbsrv = None
        want_ff = False
        if not args.no_thumb:
            t = ", ".join(self.args.th_dec) or "(None available)"
            self.log("thumb", "decoder preference: {}".format(t))

            if "pil" in self.args.th_dec and not HAVE_WEBP:
                msg = "disabling webp thumbnails because either libwebp is not available or your Pillow is too old"
                self.log("thumb", msg, c=3)

            if self.args.th_dec:
                self.thumbsrv = ThumbSrv(self)
            else:
                want_ff = True
                msg = "need either Pillow, pyvips, or FFmpeg to create thumbnails; for example:\n{0}{1} -m pip install --user Pillow\n{0}{1} -m pip install --user pyvips\n{0}apt install ffmpeg"
                msg = msg.format(" " * 37, os.path.basename(pybin))
                if EXE:
                    msg = "copyparty.exe cannot use Pillow or pyvips; need ffprobe.exe and ffmpeg.exe to create thumbnails"

                self.log("thumb", msg, c=3)

        if not args.no_acode and args.no_thumb:
            msg = "setting --no-acode because --no-thumb (sorry)"
            self.log("thumb", msg, c=6)
            args.no_acode = True

        if not args.no_acode and (not HAVE_FFMPEG or not HAVE_FFPROBE):
            msg = "setting --no-acode because either FFmpeg or FFprobe is not available"
            self.log("thumb", msg, c=6)
            args.no_acode = True
            want_ff = True

        if want_ff and ANYWIN:
            self.log("thumb", "download FFmpeg to fix it:\033[0m " + FFMPEG_URL, 3)

        if not args.no_acode:
            if not re.match("^(0|[qv][0-9]|[0-9]{2,3}k)$", args.q_mp3.lower()):
                t = "invalid mp3 transcoding quality [%s] specified; only supports [0] to disable, a CBR value such as [192k], or a CQ/CRF value such as [v2]"
                raise Exception(t % (args.q_mp3,))
        else:
            args.au_unpk = {}

        args.th_poke = min(args.th_poke, args.th_maxage, args.ac_maxage)

        zms = ""
        if not args.https_only:
            zms += "d"
        if not args.http_only:
            zms += "D"

        if args.ftp or args.ftps:
            from .ftpd import Ftpd

            self.ftpd: Optional[Ftpd] = None
            zms += "f" if args.ftp else "F"

        if args.tftp:
            from .tftpd import Tftpd

            self.tftpd: Optional[Tftpd] = None

        if args.ftp or args.ftps or args.tftp:
            Daemon(self.start_ftpd, "start_tftpd")

        if args.smb:
            # impacket.dcerpc is noisy about listen timeouts
            sto = socket.getdefaulttimeout()
            socket.setdefaulttimeout(None)

            from .smbd import SMB

            self.smbd = SMB(self)
            socket.setdefaulttimeout(sto)
            self.smbd.start()
            zms += "s"

        if not args.zms:
            args.zms = zms

        self.zc_ngen = 0
        self.mdns: Optional["MDNS"] = None
        self.ssdp: Optional["SSDPd"] = None

        # decide which worker impl to use
        if self.check_mp_enable():
            from .broker_mp import BrokerMp as Broker
        else:
            from .broker_thr import BrokerThr as Broker  # type: ignore

        self.broker = Broker(self)

        # create netmaps early to avoid firewall gaps,
        # but the mutex blocks multiprocessing startup
        for zs in "ipu_iu ftp_ipa_nm tftp_ipa_nm".split():
            try:
                getattr(args, zs).mutex = threading.Lock()
            except:
                pass

    def _db_onfail_ses(self) -> None:
        self.args.no_ses = True

    def _db_onfail_idp(self) -> None:
        self.args.idp_store = 0

    def setup_db(self, which: str) -> None:
        """
        the "non-mission-critical" databases; if something looks broken then just nuke it
        """
        if which == "ses":
            native_ver = VER_SESSION_DB
            db_path = self.args.ses_db
            desc = "sessions-db"
            pathopt = "ses-db"
            sanchk_q = "select count(*) from us"
            createfun = self._create_session_db
            failfun = self._db_onfail_ses
        elif which == "idp":
            native_ver = VER_IDP_DB
            db_path = self.args.idp_db
            desc = "idp-db"
            pathopt = "idp-db"
            sanchk_q = "select count(*) from us"
            createfun = self._create_idp_db
            failfun = self._db_onfail_idp
        else:
            raise Exception("unknown cachetype")

        if not db_path.endswith(".db"):
            zs = "config option --%s (the %s) was configured to [%s] which is invalid; must be a filepath ending with .db"
            self.log("root", zs % (pathopt, desc, db_path), 1)
            raise Exception(BAD_CFG)

        if not HAVE_SQLITE3:
            failfun()
            if which == "ses":
                zs = "disabling sessions, will use plaintext passwords in cookies"
            elif which == "idp":
                zs = "disabling idp-db, will be unable to remember IdP-volumes after a restart"
            self.log("root", "WARNING: sqlite3 not available; %s" % (zs,), 3)
            return

        assert sqlite3  # type: ignore  # !rm

        db_lock = db_path + ".lock"
        try:
            create = not os.path.getsize(db_path)
        except:
            create = True
        zs = "creating new" if create else "opening"
        self.log("root", "%s %s %s" % (zs, desc, db_path))

        for tries in range(2):
            sver = 0
            try:
                db = sqlite3.connect(db_path)
                cur = db.cursor()
                try:
                    zs = "select v from kv where k='sver'"
                    sver = cur.execute(zs).fetchall()[0][0]
                    if sver > native_ver:
                        zs = "this version of copyparty only understands %s v%d and older; the db is v%d"
                        raise Exception(zs % (desc, native_ver, sver))

                    cur.execute(sanchk_q).fetchone()
                except:
                    if sver:
                        raise
                    sver = createfun(cur)

                err = self._verify_db(
                    cur, which, pathopt, db_path, desc, sver, native_ver
                )
                if err:
                    tries = 99
                    self.args.no_ses = True
                    self.log("root", err, 3)
                break

            except Exception as ex:
                if tries or sver > native_ver:
                    raise
                t = "%s is unusable; deleting and recreating: %r"
                self.log("root", t % (desc, ex), 3)
                try:
                    cur.close()  # type: ignore
                except:
                    pass
                try:
                    db.close()  # type: ignore
                except:
                    pass
                try:
                    os.unlink(db_lock)
                except:
                    pass
                os.unlink(db_path)

    def _create_session_db(self, cur: "sqlite3.Cursor") -> int:
        sch = [
            r"create table kv (k text, v int)",
            r"create table us (un text, si text, t0 int)",
            # username, session-id, creation-time
            r"create index us_un on us(un)",
            r"create index us_si on us(si)",
            r"create index us_t0 on us(t0)",
            r"insert into kv values ('sver', 1)",
        ]
        for cmd in sch:
            cur.execute(cmd)
        self.log("root", "created new sessions-db")
        return 1

    def _create_idp_db(self, cur: "sqlite3.Cursor") -> int:
        sch = [
            r"create table kv (k text, v int)",
            r"create table us (un text, gs text)",
            # username, groups
            r"create index us_un on us(un)",
            r"insert into kv values ('sver', 1)",
        ]
        for cmd in sch:
            cur.execute(cmd)
        self.log("root", "created new idp-db")
        return 1

    def _verify_db(
        self,
        cur: "sqlite3.Cursor",
        which: str,
        pathopt: str,
        db_path: str,
        desc: str,
        sver: int,
        native_ver: int,
    ) -> str:
        # ensure writable (maybe owned by other user)
        db = cur.connection

        try:
            zil = cur.execute("select v from kv where k='pid'").fetchall()
            if len(zil) > 1:
                raise Exception()
            owner = zil[0][0]
        except:
            owner = 0

        if which == "ses":
            cons = "Will now disable sessions and instead use plaintext passwords in cookies."
        elif which == "idp":
            cons = "Each IdP-volume will not become available until its associated user sends their first request."
        else:
            raise Exception()

        if not lock_file(db_path + ".lock"):
            t = "the %s [%s] is already in use by another copyparty instance (pid:%d). This is not supported; please provide another database with --%s or give this copyparty-instance its entirely separate config-folder by setting another path in the XDG_CONFIG_HOME env-var. You can also disable this safeguard by setting env-var PRTY_NO_DB_LOCK=1. %s"
            return t % (desc, db_path, owner, pathopt, cons)

        vars = (("pid", os.getpid()), ("ts", int(time.time() * 1000)))
        if owner:
            # wear-estimate: 2 cells; offsets 0x10, 0x50, 0x19720
            for k, v in vars:
                cur.execute("update kv set v=? where k=?", (v, k))
        else:
            # wear-estimate: 3~4 cells; offsets 0x10, 0x50, 0x19180, 0x19710, 0x36000, 0x360b0, 0x36b90
            for k, v in vars:
                cur.execute("insert into kv values(?, ?)", (k, v))

        if sver < native_ver:
            cur.execute("delete from kv where k='sver'")
            cur.execute("insert into kv values('sver',?)", (native_ver,))

        db.commit()
        cur.close()
        db.close()
        return ""

    def setup_share_db(self) -> None:
        al = self.args
        if not HAVE_SQLITE3:
            self.log("root", "sqlite3 not available; disabling --shr", 1)
            al.shr = ""
            return

        assert sqlite3  # type: ignore  # !rm

        al.shr = al.shr.strip("/")
        if "/" in al.shr or not al.shr:
            t = "config error: --shr must be the name of a virtual toplevel directory to put shares inside"
            self.log("root", t, 1)
            raise Exception(t)

        al.shr = "/%s/" % (al.shr,)
        al.shr1 = al.shr[1:]

        # policy:
        # the shares-db is important, so panic if something is wrong

        db_path = self.args.shr_db
        db_lock = db_path + ".lock"
        try:
            create = not os.path.getsize(db_path)
        except:
            create = True
        zs = "creating new" if create else "opening"
        self.log("root", "%s shares-db %s" % (zs, db_path))

        sver = 0
        try:
            db = sqlite3.connect(db_path)
            cur = db.cursor()
            if not create:
                zs = "select v from kv where k='sver'"
                sver = cur.execute(zs).fetchall()[0][0]
                if sver > VER_SHARES_DB:
                    zs = "this version of copyparty only understands shares-db v%d and older; the db is v%d"
                    raise Exception(zs % (VER_SHARES_DB, sver))

                cur.execute("select count(*) from sh").fetchone()
        except Exception as ex:
            t = "could not open shares-db; will now panic...\nthe following database must be repaired or deleted before you can launch copyparty:\n%s\n\nERROR: %s\n\nadditional details:\n%s\n"
            self.log("root", t % (db_path, ex, min_ex()), 1)
            raise

        try:
            zil = cur.execute("select v from kv where k='pid'").fetchall()
            if len(zil) > 1:
                raise Exception()
            owner = zil[0][0]
        except:
            owner = 0

        if not lock_file(db_lock):
            t = "the shares-db [%s] is already in use by another copyparty instance (pid:%d). This is not supported; please provide another database with --shr-db or give this copyparty-instance its entirely separate config-folder by setting another path in the XDG_CONFIG_HOME env-var. You can also disable this safeguard by setting env-var PRTY_NO_DB_LOCK=1. Will now panic."
            t = t % (db_path, owner)
            self.log("root", t, 1)
            raise Exception(t)

        sch1 = [
            r"create table kv (k text, v int)",
            r"create table sh (k text, pw text, vp text, pr text, st int, un text, t0 int, t1 int)",
            # sharekey, password, src, perms, numFiles, owner, created, expires
        ]
        sch2 = [
            r"create table sf (k text, vp text)",
            r"create index sf_k on sf(k)",
            r"create index sh_k on sh(k)",
            r"create index sh_t1 on sh(t1)",
            r"insert into kv values ('sver', 2)",
        ]

        assert db  # type: ignore  # !rm
        assert cur  # type: ignore  # !rm
        if not sver:
            sver = VER_SHARES_DB
            for cmd in sch1 + sch2:
                cur.execute(cmd)
            self.log("root", "created new shares-db")

        if sver == 1:
            for cmd in sch2:
                cur.execute(cmd)
            cur.execute("update sh set st = 0")
            self.log("root", "shares-db schema upgrade ok")

        if sver < VER_SHARES_DB:
            cur.execute("delete from kv where k='sver'")
            cur.execute("insert into kv values('sver',?)", (VER_SHARES_DB,))

        vars = (("pid", os.getpid()), ("ts", int(time.time() * 1000)))
        if owner:
            # wear-estimate: same as sessions-db
            for k, v in vars:
                cur.execute("update kv set v=? where k=?", (v, k))
        else:
            for k, v in vars:
                cur.execute("insert into kv values(?, ?)", (k, v))

        db.commit()
        cur.close()
        db.close()

    def start_ftpd(self) -> None:
        time.sleep(30)

        if hasattr(self, "ftpd") and not self.ftpd:
            self.restart_ftpd()

        if hasattr(self, "tftpd") and not self.tftpd:
            self.restart_tftpd()

    def restart_ftpd(self) -> None:
        if not hasattr(self, "ftpd"):
            return

        from .ftpd import Ftpd

        if self.ftpd:
            return  # todo

        if not os.path.exists(self.args.cert):
            ensure_cert(self.log, self.args)

        self.ftpd = Ftpd(self)
        self.log("root", "started FTPd")

    def restart_tftpd(self) -> None:
        if not hasattr(self, "tftpd"):
            return

        from .tftpd import Tftpd

        if self.tftpd:
            return  # todo

        self.tftpd = Tftpd(self)

    def thr_httpsrv_up(self) -> None:
        time.sleep(1 if self.args.ign_ebind_all else 5)
        expected = self.broker.num_workers * self.tcpsrv.nsrv
        failed = expected - self.httpsrv_up
        if not failed:
            return

        if self.args.ign_ebind_all:
            if not self.tcpsrv.srv:
                for _ in range(self.broker.num_workers):
                    self.broker.say("cb_httpsrv_up")
            return

        if self.args.ign_ebind and self.tcpsrv.srv:
            return

        t = "{}/{} workers failed to start"
        t = t.format(failed, expected)
        self.log("root", t, 1)

        self.retcode = 1
        self.sigterm()

    def sigterm(self) -> None:
        self.signal_handler(signal.SIGTERM, None)

    def cb_httpsrv_up(self) -> None:
        self.httpsrv_up += 1
        if self.httpsrv_up != self.broker.num_workers:
            return

        ar = self.args
        for _ in range(10 if ar.ftp or ar.ftps else 0):
            time.sleep(0.03)
            if self.ftpd:
                break

        if self.tcpsrv.qr:
            self.log("qr-code", self.tcpsrv.qr)
        else:
            self.log("root", "workers OK\n")

        self.after_httpsrv_up()

    def after_httpsrv_up(self) -> None:
        self.up2k.init_vols()

        Daemon(self.sd_notify, "sd-notify")

    def _feature_test(self) -> None:
        fok = []
        fng = []
        t_ff = "transcode audio, create spectrograms, video thumbnails"
        to_check = [
            (HAVE_SQLITE3, "sqlite", "sessions and file/media indexing"),
            (HAVE_PIL, "pillow", "image thumbnails (plenty fast)"),
            (HAVE_VIPS, "vips", "image thumbnails (faster, eats more ram)"),
            (HAVE_WEBP, "pillow-webp", "create thumbnails as webp files"),
            (HAVE_FFMPEG, "ffmpeg", t_ff + ", good-but-slow image thumbnails"),
            (HAVE_FFPROBE, "ffprobe", t_ff + ", read audio/media tags"),
            (HAVE_MUTAGEN, "mutagen", "read audio tags (ffprobe is better but slower)"),
            (HAVE_ARGON2, "argon2", "secure password hashing (advanced users only)"),
            (HAVE_ZMQ, "pyzmq", "send zeromq messages from event-hooks"),
            (HAVE_HEIF, "pillow-heif", "read .heif images with pillow (rarely useful)"),
            (HAVE_AVIF, "pillow-avif", "read .avif images with pillow (rarely useful)"),
        ]
        if ANYWIN:
            to_check += [
                (HAVE_PSUTIL, "psutil", "improved plugin cleanup  (rarely useful)")
            ]

        verbose = self.args.deps
        if verbose:
            self.log("dependencies", "")

        for have, feat, what in to_check:
            lst = fok if have else fng
            lst.append((feat, what))
            if verbose:
                zi = 2 if have else 5
                sgot = "found" if have else "missing"
                t = "%7s: %s \033[36m(%s)"
                self.log("dependencies", t % (sgot, feat, what), zi)

        if verbose:
            self.log("dependencies", "")
            return

        sok = ", ".join(x[0] for x in fok)
        sng = ", ".join(x[0] for x in fng)

        t = ""
        if sok:
            t += "OK: \033[32m" + sok
        if sng:
            if t:
                t += ", "
            t += "\033[0mNG: \033[35m" + sng

        t += "\033[0m, see --deps (this is fine btw)"
        self.log("optional-dependencies", t, 6)

    def _check_env(self) -> None:
        al = self.args
        try:
            files = os.listdir(E.cfg)
        except:
            files = []

        hits = [x for x in files if x.lower().endswith(".conf")]
        if hits:
            t = "WARNING: found config files in [%s]: %s\n  config files are not expected here, and will NOT be loaded (unless your setup is intentionally hella funky)"
            self.log("root", t % (E.cfg, ", ".join(hits)), 3)

        if self.args.no_bauth:
            t = "WARNING: --no-bauth disables support for the Android app; you may want to use --bauth-last instead"
            self.log("root", t, 3)
            if self.args.bauth_last:
                self.log("root", "WARNING: ignoring --bauth-last due to --no-bauth", 3)

        have_tcp = False
        for zs in al.i:
            if not zs.startswith("unix:"):
                have_tcp = True
        if not have_tcp:
            zb = False
            zs = "z zm zm4 zm6 zmv zmvv zs zsv zv"
            for zs in zs.split():
                if getattr(al, zs, False):
                    setattr(al, zs, False)
                    zb = True
            if zb:
                t = "only listening on unix-sockets; cannot enable zeroconf/mdns/ssdp as requested"
                self.log("root", t, 3)

        if not self.args.no_dav:
            from .dxml import DXML_OK

            if not DXML_OK:
                if not self.args.no_dav:
                    self.args.no_dav = True
                    t = "WARNING:\nDisabling WebDAV support because dxml selftest failed. Please report this bug;\n%s\n...and include the following information in the bug-report:\n%s | expat %s\n"
                    self.log("root", t % (URL_BUG, VERSIONS, expat_ver()), 1)

    def _process_config(self) -> bool:
        al = self.args

        al.zm_on = al.zm_on or al.z_on
        al.zs_on = al.zs_on or al.z_on
        al.zm_off = al.zm_off or al.z_off
        al.zs_off = al.zs_off or al.z_off
        ns = "zm_on zm_off zs_on zs_off acao acam"
        for n in ns.split(" "):
            vs = getattr(al, n).split(",")
            vs = [x.strip() for x in vs]
            vs = [x for x in vs if x]
            setattr(al, n, vs)

        ns = "acao acam"
        for n in ns.split(" "):
            vs = getattr(al, n)
            vd = {zs: 1 for zs in vs}
            setattr(al, n, vd)

        ns = "acao"
        for n in ns.split(" "):
            vs = getattr(al, n)
            vs = [x.lower() for x in vs]
            setattr(al, n, vs)

        R = al.rp_loc
        if "//" in R or ":" in R:
            t = "found URL in --rp-loc; it should be just the location, for example /foo/bar"
            raise Exception(t)

        al.R = R = R.strip("/")
        al.SR = "/" + R if R else ""
        al.RS = R + "/" if R else ""
        al.SRS = "/" + R + "/" if R else "/"

        if al.rsp_jtr:
            al.rsp_slp = 0.000001

        zsl = al.th_covers.split(",")
        zsl = [x.strip() for x in zsl]
        zsl = [x for x in zsl if x]
        al.th_covers = zsl
        al.th_coversd = zsl + ["." + x for x in zsl]
        al.th_covers_set = set(al.th_covers)
        al.th_coversd_set = set(al.th_coversd)

        for k in "c".split(" "):
            vl = getattr(al, k)
            if not vl:
                continue

            vl = [os.path.expandvars(os.path.expanduser(x)) for x in vl]
            setattr(al, k, vl)

        for k in "lo hist dbpath ssl_log".split(" "):
            vs = getattr(al, k)
            if vs:
                vs = os.path.expandvars(os.path.expanduser(vs))
                setattr(al, k, vs)

        for k in "idp_adm".split(" "):
            vs = getattr(al, k)
            vsa = [x.strip() for x in vs.split(",")]
            vsa = [x.lower() for x in vsa if x]
            setattr(al, k + "_set", set(vsa))

        zs = "dav_ua1 sus_urls nonsus_urls ua_nodoc ua_nozip"
        for k in zs.split(" "):
            vs = getattr(al, k)
            if not vs or vs == "no":
                setattr(al, k, None)
            else:
                setattr(al, k, re.compile(vs))

        for k in "tftp_lsf".split(" "):
            vs = getattr(al, k)
            if not vs or vs == "no":
                setattr(al, k, None)
            else:
                setattr(al, k, re.compile("^" + vs + "$"))

        if not al.sus_urls:
            al.ban_url = "no"
        elif al.ban_url == "no":
            al.sus_urls = None

        al.xff_hdr = al.xff_hdr.lower()
        al.idp_h_usr = al.idp_h_usr.lower()
        al.idp_h_grp = al.idp_h_grp.lower()
        al.idp_h_key = al.idp_h_key.lower()

        al.ftp_ipa_nm = build_netmap(al.ftp_ipa or al.ipa, True)
        al.tftp_ipa_nm = build_netmap(al.tftp_ipa or al.ipa, True)

        mte = ODict.fromkeys(DEF_MTE.split(","), True)
        al.mte = odfusion(mte, al.mte)

        mth = ODict.fromkeys(DEF_MTH.split(","), True)
        al.mth = odfusion(mth, al.mth)

        exp = ODict.fromkeys(DEF_EXP.split(" "), True)
        al.exp_md = odfusion(exp, al.exp_md.replace(" ", ","))
        al.exp_lg = odfusion(exp, al.exp_lg.replace(" ", ","))

        for k in ["no_hash", "no_idx", "og_ua", "srch_excl"]:
            ptn = getattr(self.args, k)
            if ptn:
                setattr(self.args, k, re.compile(ptn))

        for k in ["idp_gsep"]:
            ptn = getattr(self.args, k)
            if "]" in ptn:
                ptn = "]" + ptn.replace("]", "")
            if "[" in ptn:
                ptn = ptn.replace("[", "") + "["
            if "-" in ptn:
                ptn = ptn.replace("-", "") + "-"

            ptn = ptn.replace("\\", "\\\\").replace("^", "\\^")
            setattr(self.args, k, re.compile("[%s]" % (ptn,)))

        try:
            zf1, zf2 = self.args.rm_retry.split("/")
            self.args.rm_re_t = float(zf1)
            self.args.rm_re_r = float(zf2)
        except:
            raise Exception("invalid --rm-retry [%s]" % (self.args.rm_retry,))

        try:
            zf1, zf2 = self.args.mv_retry.split("/")
            self.args.mv_re_t = float(zf1)
            self.args.mv_re_r = float(zf2)
        except:
            raise Exception("invalid --mv-retry [%s]" % (self.args.mv_retry,))

        al.tcolor = al.tcolor.lstrip("#")
        if len(al.tcolor) == 3:  # fc5 => ffcc55
            al.tcolor = "".join([x * 2 for x in al.tcolor])

        zs = al.u2sz
        zsl = zs.split(",")
        if len(zsl) not in (1, 3):
            t = "invalid --u2sz; must be either one number, or a comma-separated list of three numbers (min,default,max)"
            raise Exception(t)
        if len(zsl) < 3:
            zsl = ["1", zs, zs]
        zi2 = 1
        for zs in zsl:
            zi = int(zs)
            # arbitrary constraint (anything above 2 GiB is probably unintended)
            if zi < 1 or zi > 2047:
                raise Exception("invalid --u2sz; minimum is 1, max is 2047")
            if zi < zi2:
                raise Exception("invalid --u2sz; values must be equal or ascending")
            zi2 = zi
        al.u2sz = ",".join(zsl)

        return True

    def _ipa2re(self, txt) -> Optional[re.Pattern]:
        if txt in ("any", "0", ""):
            return None

        zs = txt.replace(" ", "").replace(".", "\\.").replace(",", "|")
        return re.compile("^(?:" + zs + ")")

    def _setlimits(self) -> None:
        try:
            import resource

            soft, hard = [
                int(x) if x > 0 else 1024 * 1024
                for x in list(resource.getrlimit(resource.RLIMIT_NOFILE))
            ]
        except:
            self.log("root", "failed to read rlimits from os", 6)
            return

        if not soft or not hard:
            t = "got bogus rlimits from os ({}, {})"
            self.log("root", t.format(soft, hard), 6)
            return

        want = self.args.nc * 4
        new_soft = min(hard, want)
        if new_soft < soft:
            return

        # t = "requesting rlimit_nofile({}), have {}"
        # self.log("root", t.format(new_soft, soft), 6)

        try:
            import resource

            resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard))
            soft = new_soft
        except:
            t = "rlimit denied; max open files: {}"
            self.log("root", t.format(soft), 3)
            return

        if soft < want:
            t = "max open files: {} (wanted {} for -nc {})"
            self.log("root", t.format(soft, want, self.args.nc), 3)
            self.args.nc = min(self.args.nc, soft // 2)

    def _logname(self) -> str:
        dt = datetime.now(self.tz)
        fn = str(self.args.lo)
        for fs in "YmdHMS":
            fs = "%" + fs
            if fs in fn:
                fn = fn.replace(fs, dt.strftime(fs))

        return fn

    def _setup_logfile(self, printed: str) -> None:
        base_fn = fn = sel_fn = self._logname()
        do_xz = fn.lower().endswith(".xz")
        if fn != self.args.lo:
            ctr = 0
            # yup this is a race; if started sufficiently concurrently, two
            # copyparties can grab the same logfile (considered and ignored)
            while os.path.exists(sel_fn):
                ctr += 1
                sel_fn = "{}.{}".format(fn, ctr)

        fn = sel_fn
        try:
            bos.makedirs(os.path.dirname(fn))
        except:
            pass

        try:
            if do_xz:
                import lzma

                lh = lzma.open(fn, "wt", encoding="utf-8", errors="replace", preset=0)
                self.args.no_logflush = True
            else:
                lh = open(fn, "wt", encoding="utf-8", errors="replace")
        except:
            import codecs

            lh = codecs.open(fn, "w", encoding="utf-8", errors="replace")

        if getattr(self.args, "free_umask", False):
            os.fchmod(lh.fileno(), 0o644)

        argv = [pybin] + self.argv
        if hasattr(shlex, "quote"):
            argv = [shlex.quote(x) for x in argv]
        else:
            argv = ['"{}"'.format(x) for x in argv]

        msg = "[+] opened logfile [{}]\n".format(fn)
        printed += msg
        t = "t0: {:.3f}\nargv: {}\n\n{}"
        lh.write(t.format(self.E.t0, " ".join(argv), printed))
        self.logf = lh
        self.logf_base_fn = base_fn
        print(msg, end="")

    def run(self) -> None:
        self.tcpsrv.run()
        if getattr(self.args, "z_chk", 0) and (
            getattr(self.args, "zm", False) or getattr(self.args, "zs", False)
        ):
            Daemon(self.tcpsrv.netmon, "netmon")

        Daemon(self.thr_httpsrv_up, "sig-hsrv-up2")

        sigs = [signal.SIGINT, signal.SIGTERM]
        if not ANYWIN:
            sigs.append(signal.SIGUSR1)

        for sig in sigs:
            signal.signal(sig, self.signal_handler)

        # macos hangs after shutdown on sigterm with while-sleep,
        # windows cannot ^c stop_cond (and win10 does the macos thing but winxp is fine??)
        # linux is fine with both,
        # never lucky
        if ANYWIN:
            # msys-python probably fine but >msys-python
            Daemon(self.stop_thr, "svchub-sig")

            try:
                while not self.stop_req:
                    time.sleep(1)
            except:
                pass

            self.shutdown()
            # cant join; eats signals on win10
            while not self.stopped:
                time.sleep(0.1)
        else:
            self.stop_thr()

    def start_zeroconf(self) -> None:
        self.zc_ngen += 1

        if getattr(self.args, "zm", False):
            try:
                from .mdns import MDNS

                if self.mdns:
                    self.mdns.stop(True)

                self.mdns = MDNS(self, self.zc_ngen)
                Daemon(self.mdns.run, "mdns")
            except:
                self.log("root", "mdns startup failed;\n" + min_ex(), 3)

        if getattr(self.args, "zs", False):
            try:
                from .ssdp import SSDPd

                if self.ssdp:
                    self.ssdp.stop()

                self.ssdp = SSDPd(self, self.zc_ngen)
                Daemon(self.ssdp.run, "ssdp")
            except:
                self.log("root", "ssdp startup failed;\n" + min_ex(), 3)

    def reload(self, rescan_all_vols: bool, up2k: bool) -> str:
        t = "config has been reloaded"
        with self.reload_mutex:
            self.log("root", "reloading config")
            self.asrv.reload(9 if up2k else 4)
            if up2k:
                self.up2k.reload(rescan_all_vols)
                t += "; volumes are now reinitializing"
            else:
                self.log("root", "reload done")
            self.broker.reload()
        return t

    def _reload_sessions(self) -> None:
        with self.asrv.mutex:
            self.asrv.load_sessions(True)
        self.broker.reload_sessions()

    def stop_thr(self) -> None:
        while not self.stop_req:
            with self.stop_cond:
                self.stop_cond.wait(9001)

            if self.reload_req:
                self.reload_req = False
                self.reload(True, True)

        self.shutdown()

    def kill9(self, delay: float = 0.0) -> None:
        if delay > 0.01:
            time.sleep(delay)
            print("component stuck; issuing sigkill")
            time.sleep(0.1)

        if ANYWIN:
            os.system("taskkill /f /pid {}".format(os.getpid()))
        else:
            os.kill(os.getpid(), signal.SIGKILL)

    def signal_handler(self, sig: int, frame: Optional[FrameType]) -> None:
        if self.stopping:
            if self.nsigs <= 0:
                try:
                    threading.Thread(target=self.pr, args=("OMBO BREAKER",)).start()
                    time.sleep(0.1)
                except:
                    pass

                self.kill9()
            else:
                self.nsigs -= 1
                return

        if not ANYWIN and sig == signal.SIGUSR1:
            self.reload_req = True
        else:
            self.stop_req = True

        with self.stop_cond:
            self.stop_cond.notify_all()

    def shutdown(self) -> None:
        if self.stopping:
            return

        # start_log_thrs(print, 0.1, 1)

        self.stopping = True
        self.stop_req = True
        with self.stop_cond:
            self.stop_cond.notify_all()

        ret = 1
        try:
            self.pr("OPYTHAT")
            tasks = []
            slp = 0.0

            if self.mdns:
                tasks.append(Daemon(self.mdns.stop, "mdns"))
                slp = time.time() + 0.5

            if self.ssdp:
                tasks.append(Daemon(self.ssdp.stop, "ssdp"))
                slp = time.time() + 0.5

            self.broker.shutdown()
            self.tcpsrv.shutdown()
            self.up2k.shutdown()

            if hasattr(self, "smbd"):
                slp = max(slp, time.time() + 0.5)
                tasks.append(Daemon(self.smbd.stop, "smbd"))

            if self.thumbsrv:
                self.thumbsrv.shutdown()

                for n in range(200):  # 10s
                    time.sleep(0.05)
                    if self.thumbsrv.stopped():
                        break

                    if n == 3:
                        self.log("root", "waiting for thumbsrv (10sec)...")

            if hasattr(self, "smbd"):
                zf = max(time.time() - slp, 0)
                Daemon(self.kill9, a=(zf + 0.5,))

            while time.time() < slp:
                if not next((x for x in tasks if x.is_alive), None):
                    break

                time.sleep(0.05)

            self.log("root", "nailed it")
            ret = self.retcode
        except:
            self.pr("\033[31m[ error during shutdown ]\n{}\033[0m".format(min_ex()))
            raise
        finally:
            if self.args.wintitle:
                print("\033]0;\033\\", file=sys.stderr, end="")
                sys.stderr.flush()

            self.pr("\033[0m", end="")
            if self.logf:
                self.logf.close()

            self.stopped = True
            sys.exit(ret)

    def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
        if not self.logf:
            return

        with self.log_mutex:
            dt = datetime.now(self.tz)
            ts = self.log_dfmt % (
                dt.year,
                dt.month * 100 + dt.day,
                (dt.hour * 100 + dt.minute) * 100 + dt.second,
                dt.microsecond // self.log_div,
            )

            if c and not self.args.no_ansi:
                if isinstance(c, int):
                    msg = "\033[3%sm%s\033[0m" % (c, msg)
                elif "\033" not in c:
                    msg = "\033[%sm%s\033[0m" % (c, msg)
                else:
                    msg = "%s%s\033[0m" % (c, msg)

            if "\033" in src:
                src += "\033[0m"

            if "\033" in msg:
                msg += "\033[0m"

            self.logf.write("@%s [%-21s] %s\n" % (ts, src, msg))
            if not self.args.no_logflush:
                self.logf.flush()

            if dt.day != self.cday or dt.month != self.cmon:
                self._set_next_day(dt)

    def _set_next_day(self, dt: datetime) -> None:
        if self.cday and self.logf and self.logf_base_fn != self._logname():
            self.logf.close()
            self._setup_logfile("")

        self.cday = dt.day
        self.cmon = dt.month

    def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
        """handles logging from all components"""
        with self.log_mutex:
            dt = datetime.now(self.tz)
            if dt.day != self.cday or dt.month != self.cmon:
                zs = "{}\n" if self.no_ansi else "\033[36m{}\033[0m\n"
                zs = zs.format(dt.strftime("%Y-%m-%d"))
                print(zs, end="")
                self._set_next_day(dt)
                if self.logf:
                    self.logf.write(zs)

            fmt = "\033[36m%s \033[33m%-21s \033[0m%s\n"
            if self.no_ansi:
                fmt = "%s %-21s %s\n"
                if "\033" in msg:
                    msg = RE_ANSI.sub("", msg)
                if "\033" in src:
                    src = RE_ANSI.sub("", src)
            elif c:
                if isinstance(c, int):
                    msg = "\033[3%sm%s\033[0m" % (c, msg)
                elif "\033" not in c:
                    msg = "\033[%sm%s\033[0m" % (c, msg)
                else:
                    msg = "%s%s\033[0m" % (c, msg)

            ts = self.log_efmt % (
                dt.hour,
                dt.minute,
                dt.second,
                dt.microsecond // self.log_div,
            )
            msg = fmt % (ts, src, msg)
            try:
                print(msg, end="")
            except UnicodeEncodeError:
                try:
                    print(msg.encode("utf-8", "replace").decode(), end="")
                except:
                    print(msg.encode("ascii", "replace").decode(), end="")
            except OSError as ex:
                if ex.errno != errno.EPIPE:
                    raise

            if self.logf:
                self.logf.write(msg)
                if not self.args.no_logflush:
                    self.logf.flush()

    def pr(self, *a: Any, **ka: Any) -> None:
        try:
            with self.log_mutex:
                print(*a, **ka)
        except OSError as ex:
            if ex.errno != errno.EPIPE:
                raise

    def check_mp_support(self) -> str:
        if MACOS and not os.environ.get("PRTY_FORCE_MP"):
            return "multiprocessing is wonky on mac osx;"
        elif sys.version_info < (3, 3):
            return "need python 3.3 or newer for multiprocessing;"

        try:
            x: mp.Queue[tuple[str, str]] = mp.Queue(1)
            x.put(("foo", "bar"))
            if x.get()[0] != "foo":
                raise Exception()
        except:
            return "multiprocessing is not supported on your platform;"

        return ""

    def check_mp_enable(self) -> bool:
        if self.args.j == 1:
            return False

        try:
            if mp.cpu_count() <= 1 and not os.environ.get("PRTY_FORCE_MP"):
                raise Exception()
        except:
            self.log("svchub", "only one CPU detected; multiprocessing disabled")
            return False

        try:
            # support vscode debugger (bonus: same behavior as on windows)
            mp.set_start_method("spawn", True)
        except AttributeError:
            # py2.7 probably, anyways dontcare
            pass

        err = self.check_mp_support()
        if not err:
            return True
        else:
            self.log("svchub", err)
            self.log("svchub", "cannot efficiently use multiple CPU cores")
            return False

    def sd_notify(self) -> None:
        try:
            zb = os.getenv("NOTIFY_SOCKET")
            if not zb:
                return

            addr = unicode(zb)
            if addr.startswith("@"):
                addr = "\0" + addr[1:]

            t = "".join(x for x in addr if x in string.printable)
            self.log("sd_notify", t)

            sck = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
            sck.connect(addr)
            sck.sendall(b"READY=1")
        except:
            self.log("sd_notify", min_ex())

    def log_stacks(self) -> None:
        td = time.time() - self.tstack
        if td < 300:
            self.log("stacks", "cooldown {}".format(td))
            return

        self.tstack = time.time()
        zs = "{}\n{}".format(VERSIONS, alltrace())
        zb = zs.encode("utf-8", "replace")
        zb = gzip.compress(zb)
        zs = ub64enc(zb).decode("ascii")
        self.log("stacks", zs)
